package com.chaosj.tools;

import java.io.*;
import java.nio.charset.Charset;
import ch.ethz.ssh2.ChannelCondition;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

/**
 * @author zhou.kan
 * @date 2022/1/21
 * @apiNote
 */
@Slf4j
public class RemoteShellUtil {

    private Connection conn;
    /** 远程机器IP */
    private String ip;
    /** 用户名 */
    private String osUsername;
    /** 密码 */
    private String password;
    private String charset = Charset.defaultCharset().toString();

    private static final int TIME_OUT = 1000 * 5 * 60;

    public RemoteShellUtil(String ip, String usr, String pasword) {
        this.ip = ip;
        this.osUsername = usr;
        this.password = pasword;
    }


    /**
     * 登录
     * @return
     * @throws IOException
     */
    private boolean login() throws IOException {
        conn = new Connection(ip);
        conn.connect();
        return conn.authenticateWithPassword(osUsername, password);
    }

    /**
     * 执行脚本
     *
     * @param cmds
     * @return
     * @throws Exception
     */
    public int exec(String cmds) throws Exception {
        InputStream stdOut = null;
        InputStream stdErr = null;
        String outStr = "";
        String outErr = "";
        int ret = -1;
        try {
            if (login()) {
                // Open a new {@link Session} on this connection
                Session session = conn.openSession();
                // Execute a command on the remote machine.
                session.execCommand(cmds);
                stdOut = new StreamGobbler(session.getStdout());
                outStr = processStream(stdOut, charset);

                stdErr = new StreamGobbler(session.getStderr());
                outErr = processStream(stdErr, charset);

                session.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT);

                if(log.isInfoEnabled()){
                    log.info("outStr is {}",outStr);
                    log.info("outErr is {}",outErr);
                }

                ret = session.getExitStatus();
            } else {
                throw new Exception("登录远程机器失败" + ip); // 自定义异常类 实现略
            }
        } finally {
            if (conn != null) {
                conn.close();
            }
            IOUtils.closeQuietly(stdOut);
            IOUtils.closeQuietly(stdErr);
        }
        return ret;
    }


    /**
     * 执行脚本
     *
     * @param cmds
     * @return
     * @throws Exception
     */
    public int exec2(String cmds) throws Exception {
        InputStream stdOut = null;
        InputStream stdErr = null;
        String outStr = "";
        String outErr = "";
        int ret = -1;
        try {
            if (login()) {
                Session session = conn.openSession();
                // 建立虚拟终端
                session.requestPTY("bash");
                // 打开一个Shell
                session.startShell();
                stdOut = new StreamGobbler(session.getStdout());
                stdErr = new StreamGobbler(session.getStderr());
                BufferedReader stdoutReader = new BufferedReader(new InputStreamReader(stdOut));
                BufferedReader stderrReader = new BufferedReader(new InputStreamReader(stdErr));

                // 准备输入命令
                PrintWriter out = new PrintWriter(session.getStdin());
                // 输入待执行命令
                out.println(cmds);
                out.println("exit");
                // 6. 关闭输入流
                out.close();
                // 7. 等待，除非1.连接关闭；2.输出数据传送完毕；3.进程状态为退出；4.超时
                session.waitForCondition(ChannelCondition.CLOSED | ChannelCondition.EOF | ChannelCondition.EXIT_STATUS , 30000);
                System.out.println("Here is the output from stdout:");
                while (true)
                {
                    String line = stdoutReader.readLine();
                    if (line == null)
                        break;
                    if(log.isInfoEnabled()){
                        log.info("Remote return message is {}", line);
                    }
                }
                System.out.println("Here is the output from stderr:");
                while (true)
                {
                    String line = stderrReader.readLine();
                    if (line == null)
                        break;
                    System.out.println(line);
                }
                /* Show exit status, if available (otherwise "null") */
                if(log.isInfoEnabled()){
                    log.info("ExitCode is {}",session.getExitStatus());
                }
                ret = session.getExitStatus();
                session.close();/* Close this session */
                conn.close();/* Close the connection */

            } else {
                throw new Exception("登录远程机器失败" + ip); // 自定义异常类 实现略
            }
        } finally {
            if (conn != null) {
                conn.close();
            }
            IOUtils.closeQuietly(stdOut);
            IOUtils.closeQuietly(stdErr);
        }
        return ret;
    }

    private String processStream(InputStream in, String charset) throws Exception {
        byte[] buf = new byte[1024];
        StringBuilder sb = new StringBuilder();
        while (in.read(buf) != -1) {
            sb.append(new String(buf, charset));
        }
        return sb.toString();
    }

    /**
     *
     * @param args
     */
    public static void main(String[] args) {
        String binPath = "/home/dq/deploy/kafka/conf-3.0.0/bin";
        String commandStr = "sh " + binPath + "/kafka-reassign-partitions.sh --bootstrap-server 10.5.181.183:7035,10.5.181.184:7036,10.5.181.184:7037 --topics-to-move-json-file " + binPath + "/topics.json --broker-list \"0,1\" --generate  ";
        System.out.println(commandStr);
        RemoteShellUtil executor = new RemoteShellUtil("10.5.181.184", "dq", "dq");
        try {
            int code = executor.exec2(commandStr);
            System.out.println(code);
        } catch (Exception exception) {
            exception.printStackTrace();
        }

    }
}
